// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <functional>
#include <optional>
#include <vector>

#include "arrow/acero/visibility.h"
#include "arrow/compute/exec.h"
#include "arrow/result.h"

namespace arrow {
namespace acero {
namespace util {

using arrow::compute::ExecBatch;

/// \brief A container that accumulates batches until they are ready to
///        be processed.
class ARROW_ACERO_EXPORT AccumulationQueue {
 public:
  AccumulationQueue() : row_count_(0) {}
  ~AccumulationQueue() = default;

  // We should never be copying ExecBatch around
  AccumulationQueue(const AccumulationQueue&) = delete;
  AccumulationQueue& operator=(const AccumulationQueue&) = delete;

  AccumulationQueue(AccumulationQueue&& that);
  AccumulationQueue& operator=(AccumulationQueue&& that);

  void Concatenate(AccumulationQueue&& that);
  void InsertBatch(ExecBatch batch);
  int64_t row_count() { return row_count_; }
  size_t batch_count() { return batches_.size(); }
  bool empty() const { return batches_.empty(); }
  void Clear();
  ExecBatch& operator[](size_t i);

 private:
  int64_t row_count_;
  std::vector<ExecBatch> batches_;
};

/// A queue that sequences incoming batches
///
/// This can be used when a node needs to do some kind of ordered processing on
/// the stream.
///
/// Batches can be inserted in any order.  The process_callback will be called on
/// the batches, in order, without reentrant calls. For this reason the callback
/// should be quick.
///
/// For example, in a top-n node, the process callback should determine how many
/// rows need to be delivered for the given batch, and then return a task to actually
/// deliver those rows.
class ARROW_ACERO_EXPORT SequencingQueue {
 public:
  using Task = std::function<Status()>;

  /// Strategy that describes how to handle items
  class Processor {
   public:
    /// Process the batch, potentially generating a task
    ///
    /// This method will be called on each batch in order.  Calls to this method
    /// will be serialized and it will not be called reentrantly.  This makes it
    /// safe to do things that rely on order but minimal time should be spent here
    /// to avoid becoming a bottleneck.
    ///
    /// \return a follow-up task that will be scheduled.  The follow-up task(s) are
    ///         is not guaranteed to run in any particular order.  If nullopt is
    ///         returned then nothing will be scheduled.
    virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
    /// Schedule a task
    virtual void Schedule(Task task) = 0;
  };

  virtual ~SequencingQueue() = default;

  /// Insert a batch into the queue
  ///
  /// This will insert the batch into the queue.  If this batch was the next batch
  /// to deliver then this will trigger 1+ calls to the process callback to generate
  /// 1+ tasks.
  ///
  /// The task generated by this call will be executed immediately.  The remaining
  /// tasks will be scheduled using the schedule callback.
  ///
  /// From a data pipeline perspective the sequencing queue is a "sometimes" breaker.  If
  /// a task arrives in order then this call will usually execute the downstream pipeline.
  /// If this task arrives early then this call will only queue the data.
  virtual Status InsertBatch(ExecBatch batch) = 0;

  /// Create a queue
  /// \param processor describes how to process the batches, must outlive the queue
  static std::unique_ptr<SequencingQueue> Make(Processor* processor);
};

/// A queue that sequences incoming batches
///
/// Unlike SequencingQueue the Process method is not expected to schedule new tasks.
///
/// If a batch arrives and another thread is currently processing then the batch
/// will be queued and control will return.  In other words, delivery of batches will
/// not block on the Process method.
///
/// It can be helpful to think of this as if a dedicated thread is running Process as
/// batches arrive
class ARROW_ACERO_EXPORT SerialSequencingQueue {
 public:
  /// Strategy that describes how to handle items
  class Processor {
   public:
    virtual ~Processor() = default;
    /// Process the batch
    ///
    /// This method will be called on each batch in order.  Calls to this method
    /// will be serialized and it will not be called reentrantly.  This makes it
    /// safe to do things that rely on order.
    ///
    /// If this falls behind then data may accumulate
    ///
    /// TODO: Could add backpressure if needed but right now all uses of this should
    ///       be pretty fast and so are unlikely to block.
    virtual Status Process(ExecBatch batch) = 0;
  };

  virtual ~SerialSequencingQueue() = default;

  /// Insert a batch into the queue
  ///
  /// This will insert the batch into the queue.  If this batch was the next batch
  /// to deliver then this may trigger calls to the processor which will be run
  /// as part of this call.
  virtual Status InsertBatch(ExecBatch batch) = 0;

  /// Create a queue
  /// \param processor describes how to process the batches, must outlive the queue
  static std::unique_ptr<SerialSequencingQueue> Make(Processor* processor);
};

}  // namespace util
}  // namespace acero
}  // namespace arrow
